The HJB equation, is used in dynamic programming to solve optimisation problem. Optimisation problems occur in all walks of life and some can even be solved. And some of those that can be solved are best solved with dynamic programming, a recursive evaluation of the best descission.
This notebook aims to explain by way of demonstration how to solve the HJB and fidn the optimal set of descissions, but also provide an easy to use base to formuate and solve your optimisation problems.
In [1]:
import numpy as np
import time
import matplotlib.pyplot as plt
In [1]:
class DynamicProgram(object):
"""
Generate a dynamic program to find a set of optimal descissions using the HJB.
define the program by:
Setting intial states via: set_inital_state(list or int)
Setting the number of steps via: set_step_number(int)
Add a set of descissions: add_decisions_set(set)
Add a cost function: add_cost_function(function in terms of state )
Add a state change equation: add_state_eq(function(state))
Add an expression for the last value: add_final_value_expression(function(state,settings))
Add limits on the states: add_state_limits(lower=list or int,upper = list or int)
See below for examples:
"""
def __init__(self):
self.settings = {
'Lower state limits' : [],
'Upper state limits' : [],
'x_set' : set(),
'cache' : {},}
def add_state_eq(self,function):
"""Returns a tuple describing the states.
Remember to increment the first state, b convention the number of steps.
Load additional parameters (usually needed for cost and state value with global variables)
Example of a function that changes the state by the decission:
def new_state(x,s):
return (s[0]+1,s[1]+x) #Return a tuple, use (s[:-1]+(5,)) to slice tuples.
"""
self.settings['State eq.'] = function
def add_cost_function(self,function):
"""
Returns a float or integer describing the cost value.
Load additional parameters (usually needed for cost and state value with global variables)
Example is a function that simply returns the decision as cost:
def cost(x,s):
return x
"""
self.settings['Cost eq.'] = function
def add_final_value_expression(self, function,):
"""
Returns a float or integer as the final value:
Example is a function that returns the ratio of the initial state and the final state:
def val_T(s,settings):
return s[1]/float(settings['Initial state'][1])
"""
self.settings['Final value'] = function
def set_step_number(self,step_number):
"""Number of stages / steps. Integer"""
self.settings['T'] = step_number
def set_inital_state(self,intial_values):
"""Provide the inital state of the states other than the stage number"""
if type(intial_values) is list:
self.settings['Initial state'] = intial_values
self.settings['Initial state'].insert(0,0)
elif type(intial_values) is int:
self.settings['Initial state'] = [intial_values]
self.settings['Initial state'].insert(0,0)
self.settings['Initial state'] = tuple(self.settings['Initial state'])
def add_state_limits(self,lower=[],upper=[]):
"""Add the limits on the state other than the stage number, leave empty if none"""
if type(lower) is list:
self.settings['Lower state limits'].extend(lower)
self.settings['Upper state limits'].extend(upper)
elif type(lower) is int:
self.settings['Lower state limits'] = [lower]
self.settings['Upper state limits'] = [upper]
def solve(self):
"""
Solves the HJB. Returns the optimal value.
Path and further info is stored in the cache. Access it via
retrieve_decisions()
"""
self.settings['cache'] ={}
return self._hjb_(self.settings['Initial state'])
def retrieve_decisions(self):
"""
Retrieve the decisions that led to the optimal value
Returns the cost for the different states, the optimal schedule and the states
that the schedule results in.
"""
sched = np.ones(self.settings['T'])*np.nan
cost_calc= 0
states = []
s = self.settings['Initial state']
t = 0
while t < self.settings['T']:
sched[t] = self.settings['cache'][s][1]
cost_calc += self.settings['Cost eq.'](sched[t],s)
states.append(s[1:])
s = self.settings['State eq.'](sched[t],s)
t += 1
states.append(s[1:])
return cost_calc, sched, states
def return_settings(self):
return self.settings
def return_cache(self):
return self.settings['cache']
def add_decisions_set(self,set_of_decisions):
"""
Add a set of permissable decissions. Must be set of unique integers.
"""
if set(set_of_decisions) != set_of_decisions:
raise TypeError('Expected a set unique values, use set() to declare a set')
self.settings['x_set'] = set(set_of_decisions)
def add_setting(self,key,value):
self.settings[key] = value
def _hjb_(self,s):
if self.settings['cache'].has_key(s):
return self.settings['cache'][s][0]
# check state bounds
for c,i in enumerate(s[1:]):
if i < self.settings['Lower state limits'][c] or i > self.settings['Upper state limits'][c]:
return float('inf')
#Check if reached time step limit:
if s[0] == self.settings['T']:
m = self.settings['Final value'](s,self.settings)
self.settings['cache'][s] = [m, np.nan]
return m
# Else enter recursion
else:
### Make decision variable vector ###
######################################################################################
# Faster but only with integer decisions
p=[]
for x in self.settings['x_set']:
p.append(self.settings['Cost eq.'](x,s)+self._hjb_(self.settings['State eq.'](x,s)))
m = min(p)
### Slower but with any imutable decisions, uncomment if desired: ###
# p ={}
# for x in self.settings['x_set']:
# p[x] = self.settings['Cost eq.'](x,s)+self._hjb_(self.settings['State eq.'](x,s))
# m = min(p, key=p.get)
########################################################################################
##############################################################
## Finding the index of the best solution ##
for x in self.settings['x_set']:
if m == p[x]:
pp = x
################################################################
self.settings['cache'][s] = [m, pp]
return m
In [ ]:
In [ ]:
We can also solve classic dynamic programming problems such as the knapsack problem, hannoi towers or the fibonacci number calculation. Blank functions are outlined below.
The functions must fulfill a range of conditions: $$ f:\mathcal{S}^n\times x \rightarrow \mathbb{R} $$ where $\mathcal{S}$ is the set of permissable states, $x$ the decision variable and $n$ the number of dimensions of the state. These are defined by $x \in \mathcal{X}$ the \texttt{set()} of permissable decisions. The state $s \in \mathcal{S}^n$ where $n \geq 2$ and $\mathcal{S} \subset \mathbb{R} $ and is finite.
The new state is defined by a function such that: $$ f:\mathcal{S}^n\times x \rightarrow \mathcal{S}^n $$
The value of the final stage is defined as:
$$
f:\mathcal{S}^n \rightarrow \mathbb{R}
$$
which as an impimentation detail has the required argument settings, to which features can be added through add_setting(key,value)
where key must be a new and unique dictionary key and value may be any permissable dictionary entry.
In [55]:
#help(DynamicProgram)
In [56]:
def cost(x,s):
"""Return a float or integer"""
pass
def new_state(x,s):
"""Return a tuple"""
pass
def val_T(s,settings):
"""Return a float or int"""
pass
We can solve a very simple pump optimsiation where the state of water in a tank is given by h and described by: $$ s_{new} = \begin{cases} (t+1,h-1) & \text{if } x = 0 \\ (t+1,h+1) & \text{if } x = 1 \\ (t+1,h+1.5) & \text{if } x = 2\end{cases} $$ The operating cost are described by: $$ cost = tarrif(t)\times x $$ where $x$ is the descission variable. The final value is given by: $$ V_T = \begin{cases} 0 & \text{if: } h_T \geq h_0 \\ Inf &\text{otherwise} \end{cases} $$
In [57]:
def simple_cost(x,s):
tariff = [19, 8, 20, 3, 12, 14, 0, 4, 3, 13, 11, 13, 13, 11, 16, 14, 16,
19, 1, 8, 0, 4, 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
12, 3, 18, 15, 3, 10, 12, 6, 3, 5, 11, 0, 11, 8, 10, 11, 5,
15, 8, 2, 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
9, 10, 13, 7, 7, 1, 12, 2, 2, 1, 5, 8, 4, 0, 11, 2, 5,
16, 8, 1, 17, 16, 3, 0, 4, 16, 0, 7]
return tariff[s[0]]*x
def val_T(s,settings):
if s[1] < settings['Initial state'][1]:
return float('inf')
else:
return 0
def simple_state(x,s):
#print s
if x == 0:
return (s[0]+1,s[1]-1)
elif x == 1:
return (s[0]+1,s[1]+1)
elif x == 2:
return (s[0]+1,s[1]+1.5)
In [68]:
pumping = DynamicProgram()
pumping.set_step_number(96)
pumping.add_decisions_set({0,1,2})
pumping.add_cost_function(simple_cost)
pumping.add_state_eq(simple_state)
pumping.add_final_value_expression(val_T)
pumping.add_state_limits(lower=0,upper = 200)
pumping.set_inital_state(100)
pumping.return_settings()
#pumping.retrieve_decisions()
Out[68]:
In [69]:
In [66]:
Out[66]:
We can have more than one state variable. For example we can add a second tank and now pump to either of them: Here they have similar equations, but they can be completly independant, of each other. Any cost and state function that meets the requirments above is allowed.
In [9]:
def simple_state2(x,s):
if x == 0:
return (s[0]+1,s[1]-1,s[2]-1)
elif x == 1:
return (s[0]+1,s[1]+1,s[2])
elif x == 2:
return (s[0]+1,s[1] ,s[2]+2)
# We also need to update the final value function.
def val_T2(s,settings):
if s[1] < settings['Initial state'][1] or s[2] < settings['Initial state'][2]:
return float('inf')
else:
return 0
# For now we leave the cost function the same, but that could also be changed.
One final example is the checkerboard problem as outlined here: https://en.wikipedia.org/wiki/Dynamic_programming#Checkerboard
In [10]:
board = np.array([[1,3,4,6],
[2,6,2,1],
[7,3,2,1],
[0,4,2,9]])
def cost(x,s):
"""Return a float or integer"""
global board
return board[s[0],s[1]]
def new_state(x,s):
"""Return a tuple"""
global board
return (int(s[0]+1),int(s[1]+x))
def val_T(s,settings):
"""Return a float or int"""
global board
return board[s[0],s[1]]
Checkerboard = DynamicProgram()
Checkerboard.add_cost_function(cost)
Checkerboard.add_decisions_set({-1,0,1})
Checkerboard.add_final_value_expression(val_T)
Checkerboard.add_state_eq(new_state)
Checkerboard.add_state_limits(lower=-1,upper=3)
Checkerboard.set_inital_state(2)
Checkerboard.set_step_number(3)
Checkerboard.solve()
print Checkerboard.retrieve_decisions()
An example we can solve is the water allocation problem from the tutorial sheets:
Consider a water supply allocation problem. Suppose that a quantity Q can be allocated to three water users (indices j=1, 2 and 3), what is allocation x4 which maximises the total net benefits? The gross benefit resulting from the allocation of $x_j$ to j is:
$$ a_j(1-exp(-b_jx_j)) $$Subject to $ (a_j, b_j > 0). $ Moreover, the costs of this allocation are:
$c_jx_j^{d_j}$ ($c_j$, $d_j >0$ and $d_j<1$, because of economy of scale).
The values of the constants are: Q=5
$a_j$ | $b_j$ | $c_j$ | $d_j$ | |
---|---|---|---|---|
j=1 | 100 | 0.1 | 10 | 0.6 |
j=2 | 50 | 0.4 | 10 | 0.8 |
j=3 | 100 | 0.2 | 25 | 0.4 |
Solve the problem in the discrete case (i.e. assuming that $x_j$ is an integer).
The optimisation problem is therefore: $$ Maximise \sum_j (a_j(1-exp(-b_jx_j)) - c_jx_j^{d_j} $$ $$ subject to: \sum_j x_j \leq Q $$ $$ x_j \geq 0 $$
This problem is a non-linear mixed integer problem and quite expensive to solve as branch and bound problem. But with DP it is much easier. (The continous case, may be much easier as NLP than DP, this shows how important it is to pay attenttion ot the problem type)
The decisions are 0 ... 5 The states are the amount of water allocated.
In [ ]:
Costs = np.array([[100,0.1, 10,0.6],
[50, 0.4, 10,0.8],
[100,0.2, 25,0.4]])
def value(x,s):
global Costs
return Costs[s[0],0]*(1-math.exp(-Costs[s[0],1]*x))
def cost(x,s):
"""Return a float or integer"""
global Costs
if
return -value(x,s) +Costs[s[0],2]*x**Costs[s[0],3]
def new_state(x,s):
"""Return a tuple"""
return s[1]-x
def val_T(s,settings):
"""Return a float or int"""
pass
Where the probability $p_{i,j}$ is the probaility of jumping from state $i$ to state $j$. Currently the transition matrix is invariate, however this can be easily implimented with P as a list of lists.
In [19]:
class StochasticProgram(DynamicProgram):
"""
Adds a stochastic component to the dynamic program.
state now is: s where s[0] is the step s[1:-1] is the states of the system and s[-1] is the stochastic state
The transition matrix for the markov chain describing the stochastic bhavior is added by:
add_transition_matrix(P) with P as a list of lists.
"""
def add_transition_matrix(self,P):
"""
Add the transition matrix as list of lists
eg. P = [[0.4,0.5,0.1],
[0.2,0.6,0.2],
[0.1,0.5,0.4]]
"""
self.settings['P'] = np.array(P)
self.settings['Len P'] = len(P)
def retrieve_decisions(self):
"""
Retrieve the decisions that led to the optimal value
Returns the cost for the different states, the optimal schedule and the states that the schedule results in.
"""
schedule = []
cost_calc= np.zeros(self.settings['Len P'])
states = []
for i in range(self.settings['Len P']):
schedule_part = []
states_part = []
s = self.settings['Initial state']
t = 0
while t < self.settings['T']:
schedule_part.append(self.settings['cache'][s][1])
cost_calc[i] += self.settings['Cost eq.'](schedule_part[t],s)
states_part.append(s[1:])
s = self.settings['State eq.'](schedule_part[t],(s[:-1]+(i,) ) )
t += 1
states_part.append(s[1:])
states.append(states_part)
schedule.append(schedule_part)
return cost_calc, schedule, states
def solve(self):
"""
Solves the HJB. Returns the optimal value.
Path and further info is stored in the cache. Access it via
retrieve_decisions()
"""
return self._hjb_stoch_(self.settings['Initial state'])
def _hjb_stoch_(self,s):
if self.settings['cache'].has_key(s):
return self.settings['cache'][s][0]
# check state bounds
for c,i in enumerate(s[1:-1]):
if i < self.settings['Lower state limits'][c] or i > self.settings['Upper state limits'][c]:
return float('inf')
#Check if reached time step limit:
if s[0] == self.settings['T']:
m = self.settings['Final value'](s,self.settings)
self.settings['cache'][s] = [m, np.nan]
return m
# Else enter recursion
else:
p=[]
for x in self.settings['x_set']:
#future = 0
# for i in range(self.settings['Len P']):
# #print self.hjb_stoch(self.settings['State eq.'](x,(s[0:-1]+(i,)))), self.settings['P'][s[-1],i]
# future += self.hjb_stoch(self.settings['State eq.'](x,(s[0:-1]+(i,))))*self.settings['P'][s[-1],i]
future = sum(self._hjb_stoch_(self.settings['State eq.'](x,(s[0:-1]+ (i,))))
*self.settings['P'][s[-1],i] for i in range(self.settings['Len P']))
p.append(self.settings['Cost eq.'](x,s) + future)
m = min(p)
for x in self.settings['x_set']:
if m == p[x]:
pp = x
self.settings['cache'][s] = [m, pp]
return m
In [20]:
help(StochasticProgram)
The cost of operating a pump with a given wind turbine power input given by a certain state is given by: $$ cost(x,t,h,j) := \begin{cases} T(t) \times (x \times P_p - W(t,j)) & \text{if} +ve \\ E_{xp} \times (x \times P_p - W(t,j)) & \text{if} -ve\end{cases} $$ where $W(t,j)$ is the wind power output at time $t$ with an error state $j$.
In [ ]:
In [21]:
# Convention s = t,h,j
def stoch_simple_state(x,s):
#print s
if x == 0:
return (s[0]+1,s[1]-1,s[2])
elif x == 1:
return (s[0]+1,s[1]+1,s[2])
elif x == 2:
return (s[0]+1,s[1]+1.5,s[2])
def err_corr_wind_power_cost(x,s):
Tariff = [5,5,5,5,5,8,8,8,8,8,12,12,12,12,12,50,50,50,50,20,20,6,5,5]
Wind = [46, 1, 3, 36, 30, 19, 9, 26, 35, 5, 49, 3, 6, 36, 43, 36, 14,
34, 2, 0, 0, 30, 13, 36]
diff = np.array([-1,0,1])*3
Export_price = 5.5
wind_out = Wind[s[0]]+diff[s[2]]
if wind_out <= 0:
wind_out = 0
power_con = x*60-wind_out
if power_con >= 0:
return power_con*Tariff[s[0]]
else:
return power_con*Export_price
def val_T(s,settings):
if s[1] < settings['Initial state'][1]:
return float('inf')
else:
return 0
transition = np.array([[0.4,0.5,0.1],[0.2,0.6,0.2],[0.1,0.5,0.4]])
In [ ]:
In [23]:
pumping_stoch = StochasticProgram()
pumping_stoch.add_decisions_set({0,1,2})
pumping_stoch.add_cost_function(err_corr_wind_power_cost)
pumping_stoch.add_state_eq(stoch_simple_state)
pumping_stoch.add_final_value_expression(val_T)
pumping_stoch.add_state_limits(lower=[0,0],upper = [200,3])
pumping_stoch.set_inital_state([100,1])
pumping_stoch.set_step_number(24)
pumping_stoch.add_transition_matrix(transition)
#print pumping_stoch.settings
pumping_stoch.solve()
#pumping_stoch.retrieve_decisions()
Out[23]:
In [24]:
pumping_stoch.retrieve_decisions()
Out[24]:
The cost of operating a pump with a given wind turbine power input is given by: $$ cost(x,t,h) := \begin{cases} T(t) \times (x \times P_p - W(t)) & \text{if} +ve \\ E_{xp} \times (x \times P_p - W(t)) & \text{if} -ve\end{cases} $$ where $x$ is the descision variable, $W(t)$ is the wind turbine output in time step $t$. $P_p$ is the pump power, $E_{xp}$ is the export price.
In [4]:
def wind_power_cost(x,s):
"""Very simple cost function for a pump with wind turbine power"""
Tariff = [5,5,5,5,5,8,8,8,8,8,12,12,12,12,12,50,50,50,50,20,20,6,5,5]
Wind = [46, 1, 3, 36, 30, 19, 9, 26, 35, 5, 49, 3, 6, 36, 43, 36, 14,
34, 2, 0, 0, 30, 13, 36]
Export_price = 5.5
power_con = x*60-Wind[s[0]]
if power_con >= 0:
return power_con*Tariff[s[0]]
else:
return power_con*Export_price
In [27]:
def stoch_wind_power_cost(x,s):
Tariff = [5,5,5,5,5,8,8,8,8,8,12,12,12,12,12,50,50,50,50,20,20,6,5,5]
Wind = [46, 1, 3, 36, 30, 19, 9, 26, 35, 5, 49, 3, 6, 36, 43, 36, 14,
34, 2, 0, 0, 30, 13, 36]
Export_price = 5.5
wind_out = sum(Wind[s[0]]*i for i in settings['P'][s[2]])
power_con = x*60-wind_out
if power_con >= 0:
return power_con*Tariff[s[0]]
else:
return power_con*Export_price
In [32]:
assert(hjb((settings['T'],settings['H_init']-1)) == 10000)
assert(hjb((settings['T'],settings['H_init'])) == 0)
assert(hjb((settings['T']-1,settings['H_min']-1)) == 10000)
assert(hjb((settings['T']-1,settings['H_max']+1)) == 10000)
state is given by $s = (t,h,i)$
$$ v(s) = \min_x(cost(x,s) + \sum_j p_{ij} v(new state(x,s))) $$
In [34]:
def make_schedule(settings):
sched = np.ones(settings['T'])*np.nan
cost_calc= 0
elev = np.ones(settings['T']+1)*np.nan
s = settings['Initial state']
t = 0
while t < settings['T']:
sched[t] = settings['cache'][s][1]
print sched[t]
cost_calc += settings['Cost eq.'](sched[t],s)
elev[t] = s[1]
s = settings['State eq.'](sched[t],s)
t += 1
elev[settings['T']] = s[1]
return cost_calc, sched, elev
In [112]:
def make_schedule2(settings):
sched_stack = []
cost_summary = []
string_stack = []
elev = np.ones(settings['T']+1)*np.nan
for ij in [0,1,2]:
cost_calc = 0
string_stack.insert(i,[])
s = settings['Initial state']
t = 0
#string_stack[ij].insert(0,[])
#string_stack[ij].insert(0,'{0:2} {1} {2}'.format(t,settings['cache'][s][1], s[1:]))
string_stack[ij].insert(0,'{0}'.format(s))
while t < settings['T']:
state = tuple(sk if sk in s[:-1] else ij for sk in s )
print s, s[:-1], state
x = settings['cache'][state][1]
print x
cost_calc += settings['Cost eq.'](x,s)
elev[t] = s[1]
s = settings['State eq.'](x,s)
t += 1
string_stack[ij].insert(t,'{0} {1} {2}'.format(x,s[1:],ij) )
elev[settings['T']] = s[1]
return string_stack, cost_calc
In [113]:
make_schedule2(settings)
Out[113]:
In [74]:
settings['cache']
Out[74]:
In [49]:
def stoch_hjb(s):
global settings
if settings['cache'].has_key(s):
return settings['cache'][s][0]
if s[0] == settings['T'] and s[1] < settings['H_init']:
return 10000
elif s[0] == settings['T'] and s[1] >= settings['H_init']:
return 0
elif s[1] < settings['H_min'] or s[1] > settings['H_max']:
return 10000
else:
p=[]
for x in settings['x_set']:
future = sum(stoch_hjb(settings['State eq.'](x,(s[0],s[1],i)))
*settings['P'][s[2]][i] for i in [0,1,2])
p.append(settings['Cost eq.'](x,s) + future)
m = min(p)
for x in settings['x_set']:
if m == p[x]:
pp = x
settings['cache'][s] = [m, pp]
return m
In [75]:
A = []
for i in [0,1,2]:
A.insert(i,[])
print A
for t in range(5):
A[i].insert(t,i*t**2)
A
Out[75]:
In [66]:
cost_calc, sched, elev = make_schedule(settings)
print sched
print elev
print cost_calc
In [55]:
settings['cache'][settings['Initial state']]
Out[55]:
In [20]:
cost_calc, sched, elev = make_schedule(settings)
print sched
print elev
print cost_calc
In [39]:
dic = {'blub': simple_cost
}
dic['blub']
Out[39]:
In [40]:
dic['blub'](1,(0,1))
Out[40]:
In [11]:
len([5,5,5,5,5,8,8,8,8,8,12,12,12,12,12,50,50,50,50,20,20,6,5,5])
Out[11]:
In [12]:
np.random.randint(50,size=24)
Out[12]:
In [47]:
x = [100,7,90,787]
x_lim_min = [0,10,0,0]
x_lim_max = [100,10,100,1000]
for c,i in enumerate(x):
if i < x_lim_min[c] or i > x_lim_max[c]:
print 1000
In [ ]: